Spark RDD常用算子操作(八) 键值对关联操作 subtractByKey, join,fullOuterJoin, rightOuterJoin, leftOuterJoin

原文作者:翟开顺
首发:CSDN
本人仅为自己方便查阅做了摘抄,请支持原作者
原文地址:https://blog.csdn.net/t1dmzks/article/details/72077428

键值对关联操作 subtractByKey, join,fullOuterJoin, rightOuterJoin, leftOuterJoin

github: https://github.com/zhaikaishun/spark_tutorial/tree/master/src/main/java/com/spark/rdd_tutorial/tutorial8
先从spark-learning中的一张图大致了解其功能
键值对操作

subtractByKey

函数定义

1
2
3
4
5
def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]

def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]

def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]

类似于subtrac,删掉 RDD 中键与 other RDD 中的键相同的元素

join

函数定义

1
2
3
4
5
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

RDD1.join(RDD2)
可以把RDD1,RDD2中的相同的key给连接起来,类似于sql中的join操作

fullOuterJoin

和join类似,不过这是全连接

leftOuterJoin

1
2
3
4
5
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]

def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

直接看图即可
对两个 RDD 进行连接操作,类似于sql中的左外连接

rightOuterJoin

对两个 RDD 进行连接操作,类似于sql中的右外连接,存在的话,value用的Some, 不存在用的None,具体的看上面的图和下面的代码即可

代码示例

scala语言

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> val rdd = sc.makeRDD(Array((1,2),(3,4),(3,6)))
scala> val other = sc.makeRDD(Array((3,9)))

scala> rdd.subtractByKey(other).collect()
res0: Array[(Int, Int)] = Array((1,2))

scala> rdd.join(other).collect()
res1: Array[(Int, (Int, Int))] = Array((3,(4,9)), (3,(6,9)))

scala> rdd.leftOuterJoin(other).collect()
res2: Array[(Int, (Int, Option[Int]))] = Array((1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9))))

scala> rdd.rightOuterJoin(other).collect()
res3: Array[(Int, (Option[Int], Int))] = Array((3,(Some(4),9)), (3,(Some(6),9)))

java语言

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Map;

public class JoinRDD {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("ReduceByKey").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
sc.setLogLevel("WARN");

JavaRDD<Tuple2<Integer,Integer>> rddPre = sc.parallelize(Arrays.asList(new Tuple2(1,2)
, new Tuple2(3,4)
, new Tuple2(3,6)));
JavaRDD<Tuple2<Integer,Integer>> otherPre = sc.parallelize(Arrays.asList(new Tuple2(3,10),new Tuple2(4,8)));

//JavaRDD转换成JavaPairRDD
JavaPairRDD<Integer, Integer> rdd = JavaPairRDD.fromJavaRDD(rddPre);
JavaPairRDD<Integer, Integer> other = JavaPairRDD.fromJavaRDD(otherPre);
//subtractByKey
JavaPairRDD<Integer, Integer> subRDD = rdd.subtractByKey(other);

//join
JavaPairRDD<Integer, Tuple2<Integer, Integer>> joinRDD = rdd.join(other);
//fullOutJoin
JavaPairRDD<Integer, Tuple2<Optional<Integer>, Optional<Integer>>> fullOutJoinRDD = rdd.fullOuterJoin(other);
//leftOuterJoin
JavaPairRDD<Integer, Tuple2<Integer, Optional<Integer>>> leftOutJoinRDD = rdd.leftOuterJoin(other);

//rightOutJoin
JavaPairRDD<Integer, Tuple2<Optional<Integer>, Integer>> rightOutJoinRDD = rdd.rightOuterJoin(other);
//输出看效果
Map<Integer, Integer> subMap = subRDD.collectAsMap();
System.out.println("-------------subRDD-------------");
for (Integer key : subMap.keySet()) {
System.out.println("subRDD: "+key+", "+subMap.get(key));
}
Map<Integer, Tuple2<Integer, Integer>> joinMap = joinRDD.collectAsMap();
System.out.println("-------------joinRDD-------------");
for (Integer key : joinMap.keySet()) {
System.out.println("join: "+key+", Tuple("+joinMap.get(key)._1+","+joinMap.get(key)._2+")");
}
Map<Integer, Tuple2<Optional<Integer>, Optional<Integer>>> fullOutJoinMap = fullOutJoinRDD.collectAsMap();
System.out.println("-------------fullOutJoinRDD-------------");
for (Integer key : fullOutJoinMap.keySet()) {
System.out.println("fullOutJoinRDD: "+key+", Tuple("+fullOutJoinMap.get(key)._1+","+fullOutJoinMap.get(key)._2+")");
}

Map<Integer, Tuple2<Integer, Optional<Integer>>> leftOutJoinMap = leftOutJoinRDD.collectAsMap();
System.out.println("-------------leftOutJoinRDD-------------");
for (Integer key : leftOutJoinMap.keySet()) {
System.out.println("leftOutJoinRDD: "+key+", Tuple("+leftOutJoinMap.get(key)._1+","+leftOutJoinMap.get(key)._2+")");
}

Map<Integer, Tuple2<Optional<Integer>, Integer>> rightOutJoinMap = rightOutJoinRDD.collectAsMap();
System.out.println("-------------rightOutJoinRDD-------------");
for (Integer key : rightOutJoinMap.keySet()) {
System.out.println("rightOutJoinRDD: "+key+", Tuple("+rightOutJoinMap.get(key)._1+","+rightOutJoinMap.get(key)._2+")");
}

}
}

运行后显示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-------------subRDD-------------
subRDD: 1, 2
-------------joinRDD-------------
join: 3, Tuple(6,10)
-------------fullOutJoinRDD-------------
fullOutJoinRDD: 4, Tuple(Optional.empty,Optional[8])
fullOutJoinRDD: 1, Tuple(Optional[2],Optional.empty)
fullOutJoinRDD: 3, Tuple(Optional[6],Optional[10])
-------------leftOutJoinRDD-------------
leftOutJoinRDD: 1, Tuple(2,Optional.empty)
leftOutJoinRDD: 3, Tuple(6,Optional[10])
-------------rightOutJoinRDD-------------
rightOutJoinRDD: 4, Tuple(Optional.empty,8)
rightOutJoinRDD: 3, Tuple(Optional[6],10)

0%